/*

 * Licensed to the Apache Software Foundation (ASF) under one

 * or more contributor license agreements.  See the NOTICE file

 * distributed with this work for additional information

 * regarding copyright ownership.  The ASF licenses this file

 * to you under the Apache License, Version 2.0 (the

 * "License"); you may not use this file except in compliance

 * with the License.  You may obtain a copy of the License at

 *

 *     http://www.apache.org/licenses/LICENSE-2.0

 *

 * Unless required by applicable law or agreed to in writing, software

 * distributed under the License is distributed on an "AS IS" BASIS,

 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

 * See the License for the specific language governing permissions and

 * limitations under the License.

 */

package com.bff.gaia.unified.sdk.extensions.sorter;



import com.bff.gaia.unified.sdk.values.KV;



import java.io.IOException;

import java.io.Serializable;



import static com.bff.gaia.unified.vendor.guava.com.google.common.base.Preconditions.checkArgument;



/**

 * {@link Sorter} that will use in memory sorting until the values can't fit into memory and will

 * then fall back to external sorting.

 */

public class BufferedExternalSorter implements Sorter {

  public static Options options() {

    return new Options("/tmp", 100);

  }



  /** Contains configuration for the sorter. */

  public static class Options implements Serializable {

    private final String tempLocation;

    private final int memoryMB;



    private Options(String tempLocation, int memoryMB) {

      this.tempLocation = tempLocation;

      this.memoryMB = memoryMB;

    }



    /** Sets the path to a temporary location where the sorter writes intermediate files. */

    public Options withTempLocation(String tempLocation) {

      checkArgument(

          !tempLocation.startsWith("gs://"),

          "BufferedExternalSorter does not support GCS temporary location");



      return new Options(tempLocation, memoryMB);

    }



    /** Returns the configured temporary location. */

    public String getTempLocation() {

      return tempLocation;

    }



    /**

     * Sets the size of the memory buffer in megabytes. This controls both the buffer for initial in

     * memory sorting and the buffer used when external sorting. Must be greater than zero and less

     * than 2048.

     */

    public Options withMemoryMB(int memoryMB) {

      checkArgument(memoryMB > 0, "memoryMB must be greater than zero");

      // Hadoop's external sort stores the number of available memory bytes in an int, this prevents

      // overflow

      checkArgument(memoryMB < 2048, "memoryMB must be less than 2048");

      return new Options(tempLocation, memoryMB);

    }



    /** Returns the configured size of the memory buffer. */

    public int getMemoryMB() {

      return memoryMB;

    }

  }



  private final ExternalSorter externalSorter;

  private InMemorySorter inMemorySorter;



  boolean inMemorySorterFull;



  BufferedExternalSorter(ExternalSorter externalSorter, InMemorySorter inMemorySorter) {

    this.externalSorter = externalSorter;

    this.inMemorySorter = inMemorySorter;

  }



  public static BufferedExternalSorter create(Options options) {

    ExternalSorter.Options externalSorterOptions = new ExternalSorter.Options();

    externalSorterOptions.setMemoryMB(options.getMemoryMB());

    externalSorterOptions.setTempLocation(options.getTempLocation());



    InMemorySorter.Options inMemorySorterOptions = new InMemorySorter.Options();

    inMemorySorterOptions.setMemoryMB(options.getMemoryMB());



    return new BufferedExternalSorter(

        ExternalSorter.create(externalSorterOptions), InMemorySorter.create(inMemorySorterOptions));

  }



  @Override

  public void add(KV<byte[], byte[]> record) throws IOException {

    if (!inMemorySorterFull) {

      if (inMemorySorter.addIfRoom(record)) {

        return;

      } else {

        // Flushing contents of in memory sorter to external sorter so we can rely on external

        // from here on out

        inMemorySorterFull = true;

        transferToExternalSorter();

      }

    }



    // In memory sorter is full, so put in external sorter instead

    externalSorter.add(record);

  }



  /**

   * Transfers all of the records loaded so far into the in memory sorter over to the external

   * sorter.

   */

  private void transferToExternalSorter() throws IOException {

    for (KV<byte[], byte[]> record : inMemorySorter.sort()) {

      externalSorter.add(record);

    }

    // Allow in memory sorter and its contents to be garbage collected

    inMemorySorter = null;

  }



  @Override

  public Iterable<KV<byte[], byte[]>> sort() throws IOException {

    if (!inMemorySorterFull) {

      return inMemorySorter.sort();

    } else {

      return externalSorter.sort();

    }

  }

}